package com.atguigu.chapter11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/6/19 9:26
 */
public class Flink02_Table_Connector_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
       
        // 2. 创建流式表的环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        
        
        // 定义动态表的的元数据
        Schema schema = new Schema()
            .field("id", DataTypes.STRING())
            .field("ts", DataTypes.BIGINT())
            .field("vc", DataTypes.INT());
        tEnv
            .connect(new FileSystem().path("input/sensor.txt"))
            .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n"))
            .withSchema(schema)
            .createTemporaryTable("sensor"); // 动态表的表名
    
        Table sensor = tEnv.from("sensor");
        
        sensor
            .groupBy($("id"))
            .select($("id"), $("vc").sum())
            .execute()
            .print();  // 仅仅用来做测试
    
    }
}
